/*************************************************************************
	> File Name: pipe.c
	> Author: 
	> Mail: 
	> Created Time: 2015年01月08日 星期四 22时13分36秒
    > note: pipe_t维护了一个链表，将几个stage_t连接在一起。创建时，会创建几个
    > stage，每个stage都会对应一个thread（除了最后一个）。每个thread会调用
    > pipe_stage。程序执行时，会首先创建n个stage，建立链表。之后创建线程，
    > 利用pipe_start函数向pipe中输入数据。数据会从第一个stage一直传送到
    > 最后一个stage。最后一个stage只是向main提供result，main输出结果。
    > 输入时，输入一个数字会将数字传入pipe，输入=会从pipe中获取数据。
    > 本程序利用了两个条件变量，若当前stage无数据，会发送ready消息，
    > 接收数据。若当前stage获得数据，设置自己的状态为data_ready,发送
    > avail消息。当得到avail消息时，线程会将数据发送到下一个stage，之后
    > 设置自己状态为ready
 ************************************************************************/

#include<pthread.h>
#include"errors.h"

typedef struct stage_tag{
    int                 id;
    pthread_mutex_t     mutex;
    pthread_cond_t      avail;
    pthread_cond_t      ready;
    int                 data_ready; // data present
    long                data;
    pthread_t           thread;
    struct stage_tag    *next;
}stage_t;

typedef struct pipe_tag{
    pthread_mutex_t     mutex; // Mutex to protect pipe
    stage_t             *head; // First stage
    stage_t             *tail; //Final stage
    int                 stages;// Number of stages
    int                 active;// Active data elements
}pipe_t;

// send message to pipe stage Threads use this to pass along
// the moddified data item

int pipe_send(stage_t *stage,long data){
    int status;
    
    if((status = pthread_mutex_lock(&stage->mutex))){
        return status;
    }

    while(stage->data_ready){
        status = pthread_cond_wait(&stage->ready,&stage->mutex);
        if(status != 0){
            pthread_mutex_unlock(&stage->mutex);
            return status;
        }
    }

    stage->data = data;
    stage->data_ready = 1;
    
    if((status = pthread_cond_signal(&stage->avail))){
        pthread_mutex_unlock(&stage->mutex);
        return status;
    }

    status = pthread_mutex_unlock(&stage->mutex);
    return status;
}

void *pipe_stage(void* arg){
    stage_t * stage = (stage_t*)arg;
    stage_t * next_stage = stage->next;
    int status;  

    if((status = pthread_mutex_lock(&stage->mutex))){
       err_abort(status,"Lock pipe stage"); 
    }

    while(1){
        while(stage->data_ready != 1){
            status = pthread_cond_wait(&stage->avail,&stage->mutex);
            if(status){
                err_abort(status, "Wait for previous stage");
            }
        }
        pipe_send(next_stage,stage->data);
//        printf("id :%d send next stage :%d data : %ld\n",stage->id,next_stage->id,stage->data);
        stage->data_ready = 0;
        status = pthread_cond_signal(&stage->ready);
    }
    //notice the routine never unlocks the stage->mutex.the pthread_cond_wait will
    //unlock mutex while the thread is waiting.

}

// pipe_create will create a pipe line

int pipe_create(pipe_t *pipe , int stages){
    int pipe_index;
    stage_t ** link = &pipe->head,*new_stage,*stage;
    int status;

    if((status = pthread_mutex_init(&pipe->mutex,NULL))){
        err_abort(status,"Init pipe mutex");
    }

    pipe->stages = stages;
    pipe->active = 0;
    // [0,stages] number of stage is stages + 1
    for(pipe_index = 0;pipe_index <= stages;pipe_index++){
        new_stage = (stage_t*)malloc(sizeof(stage_t));
        if(NULL == new_stage){
            errno_abort("Allocate stage");
        }
        new_stage->id = pipe_index;

        if((status = pthread_mutex_init(&new_stage->mutex,NULL))){
            err_abort(status,"Init stage mutex");
        }
        if((status = pthread_cond_init(&new_stage->avail,NULL))){
            err_abort(status,"Init stage avail");
        }
        if((status = pthread_cond_init(&new_stage->ready,NULL))){
            err_abort(status,"Init stage ready");
        }

        new_stage->data_ready = 0;
        *link = new_stage;
        link = &new_stage->next;
        
    }

    *link = (stage_t*)NULL;
    pipe->tail = new_stage;

    // create thread for pipe stage
    // the last stage doesn't get a thread it's just receptaole for 
    // final pipeline value

    int i;
    
    for(stage = pipe->head,i = 0; stage->next != NULL;stage = stage->next,i++){
        if((status = pthread_create(&stage->thread,NULL,pipe_stage,(void*)stage))){
            err_abort(status,"Create pipe stage");
        }
        printf("create new thread :%d\n",i);
    }
    return 0;
}

int pipe_start(pipe_t *pipe,long value){
    int status;

    if((status = pthread_mutex_lock(&pipe->mutex))){
        err_abort(status,"Lock pipe mutex");
    }
    puts("pipe_start locked ");
    pipe->active++;
    

    if((status = pthread_mutex_unlock(&pipe->mutex))){
        err_abort(status,"Unlock pipe mutex");
    }

    printf("pipe_start unlocked active %d \n",pipe->active);

    if(pipe->active > pipe->stages + 1){
        pipe->active = pipe->stages + 1;
        printf("pipe is full acitve :%d\n",pipe->active);
        return -1;
    }
    pipe_send(pipe->head,value);
    return 0;
}

int pipe_result(pipe_t *pipe,long *result){
    stage_t *tail = pipe->tail;
    long value;
    int empty = 0;
    int status;

    if((status = pthread_mutex_lock(&pipe->mutex))){
        err_abort(status,"Lock pipe mutex");
    }

    puts("pipe_result lock");

    if(pipe->active <= 0){
        empty = 1;
    }else{
        pipe->active--;
    }

    if((status = pthread_mutex_unlock(&pipe->mutex))){
        err_abort(status,"Lock pipe mutex");
    }
    puts("pipe_result unlock");

    if(empty) return 0;

    pthread_mutex_lock(&tail->mutex);
    while(!tail->data_ready){
        pthread_cond_wait(&tail->avail,&tail->mutex);
    }
    *result = tail->data;
    tail->data_ready = 0;
    pthread_cond_signal(&tail->ready);
    pthread_mutex_unlock(&tail->mutex);

    puts("pipe_result return 1");
    return 1;
}

void show_stage(pipe_t * pipe){
    stage_t * link;
    link = pipe->head;
    int i;

    for(i = 0;link != NULL;link = link->next,i++){
        printf("%d : data :%ld\n",i,link->data);
    }
    return;
}

int main(int argc,char*argv[]){
    pipe_t my_pipe;
    long value,result;
    int status;
    char line[128];

    pipe_create(&my_pipe,5);
    printf("Enter integer values ,or \"=\" for next result\n");
    
    while(1){
        printf("Data> ");
        if(fgets(line,sizeof(line),stdin) == NULL) exit(0);
        if(strlen(line) <= 1) continue;
        if(strlen(line) <= 2 && line[0] == '='){
            if(pipe_result(&my_pipe,&result)){
                printf("Result is %ld\n",result);
            }else{
                printf("Pipe is empty\n");
            }
            show_stage(&my_pipe);
        }else{
            if(sscanf(line,"%ld",&value) < 1 ){
                fprintf(stderr,"Enter an integer value\n");
            }else{
                pipe_start(&my_pipe,value);
                show_stage(&my_pipe);
            }
        }
    }
    return 0;
}
